一、github源码
Kafka-0.8.2.1 producer&consumer 示例源码github地址:
https://github.com/shiyueqi/kafka-test
二、Kafka安装与配置
三、Kafka官方示例
Kafka wiki
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
0.8.2版本的producer代码
https://github.com/apache/kafka/blob/0.8.2/examples/src/main/java/kafka/examples/Producer.java
0.8.2版本的consumer代码
https://github.com/apache/kafka/blob/0.8.2/examples/src/main/java/kafka/examples/Consumer.java
0.10.2.0版本的producer代码
https://github.com/apache/kafka/blob/0.10.2/examples/src/main/java/kafka/examples/Producer.java
0.10.2.0版本的consumer代码
https://github.com/apache/kafka/blob/0.10.2/examples/src/main/java/kafka/examples/Consumer.java
四、Kafka-0.8.2.1 producer
maven依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
</dependencies>
代码
package com.unionpay.kafka.test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* date: 2017/04/21 14:54.
* author: Yueqi Shi
*/
public class ProducerMain {
public static final String KAFKA_TOPIC = "topic_1";
private static final String BROKERS_ADDRESS = "172.18.55.21:9092,172.18.55.21:9093";
private static final int REQUEST_REQUIRED_ACKS = 1;
public static final String MESSAGE = "message";
private static final String CLIENT_ID = "producer_test_id";
private KafkaProducer<String, String> producer;
public ProducerMain() {
producer = new KafkaProducer<String, String>(buildProperties());
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.print("Kafka producer graceful shutdown.");
ProducerMain.this.shuntown();
}
});
}
private Properties buildProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS));
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
return props;
}
/**
* 开始生产msg
*/
public void run() {
for (int i = 0; i < 10000; i++) {
String message = ProducerMain.MESSAGE + "_" + i;
try {
producer.send(new ProducerRecord<String, String>(ProducerMain.KAFKA_TOPIC, message)).get();
//Thread.sleep(1000);
System.out.println("produce: " + message + " success.");
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("produce: " + message + " fail.");
} catch (ExecutionException e) {
e.printStackTrace();
System.out.println("produce: " + message + " fail.");
}
}
}
/**
* shuntown
*/
public void shuntown() {
if (null != producer) {
producer.close();
producer = null;
}
}
public static void main(String[] args) {
new ProducerMain().run();
}
}
代码说明
需指定topic名称
Kafka的broker集群地址
五、Kafka-0.8.2.1 consumer
maven依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
</dependencies>
代码
ConsumerExecutorService类,即消费消息后,执行类
package com.unionpay.kafka.test;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
/**
* date: 2017/04/24 13:53.
* author: Yueqi Shi
*/
public class ConsumerExecutorService implements Runnable {
private KafkaStream stream;
public ConsumerExecutorService(KafkaStream stream) {
this.stream = stream;
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
System.out.println("consume: " + message);
}
}
}
ConsumreMain类
package com.unionpay.kafka.test;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* date: 2017/04/21 15:11.
* author: Yueqi Shi
*/
public class ConsumerMain {
public static final String KAFKA_TOPIC = "topic_1";
private static final String ZOOKEEPER_ADDRESS = "172.18.55.21:2181,172.18.55.21:2182";
private static final int THREADS_NUM = 1;
private static final String CLIENT_ID = "consumer_test_id";
private final ConsumerConnector consumerConnector;
private ExecutorService executor;
public ConsumerMain() {
consumerConnector = Consumer.createJavaConsumerConnector(buildProperties());
executor = Executors.newFixedThreadPool(THREADS_NUM);
}
/**
* 构造连接参数
* @return
*/
private ConsumerConfig buildProperties() {
Properties props = new Properties();
props.put("zookeeper.connect", ZOOKEEPER_ADDRESS);
props.put("group.id", "test_group");
props.put("zookeeper.session.timeout.ms", "10000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("client.id", CLIENT_ID);
return new ConsumerConfig(props);
}
/**
* 开始消费
*/
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(KAFKA_TOPIC, THREADS_NUM);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(KAFKA_TOPIC);
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerExecutorService(stream));
}
}
/**
* shutdown
*/
public void shutdown() {
if (consumerConnector != null) consumerConnector.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}
public static void main(String[] args) {
new ConsumerMain().run();
}
}
代码说明
- 需指定topic
- 需指定Zookeeper集群地址
- 需指定group.id。在Kafka的consumer中,是以group来进行消费,一个group中有一个consumer示例消费后,其他不会进行消费。
六、版权声明
转载请注明出处:https://shiyueqi.github.io/2017/04/27/Kafka-0.8.2.1 producer&consumer示例/
Author: Yueqi Shi
Date: 2017-04-27 11:20:00 AM